package com.baosight.xdata;


import com.baosight.xdata.dto.TssEventMessageRecord;
import com.baosight.xdata.redis.LettuceUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.Map;

@Component
@RequiredArgsConstructor
@Slf4j
public class StreamProducer implements Runnable{

    /**
     * 同步操作
     * api和jedis很类似
     */
    public void sync() {
        int i = 0;
        String key="";
        String value ="{\n" +
                "\t\"tasks\": [{\n" +
                "\t\t\"id\": \"tss-source-db-ddl-create\",\n" +
                "\t\t\"name\": \"tss-source-db-ddl-create_DAG\",\n" +
                "\t\t\"desc\": null,\n" +
                "\t\t\"type\": \"TSS_SQL\",\n" +
                "\t\t\"runFlag\": \"NORMAL\",\n" +
                "\t\t\"loc\": null,\n" +
                "\t\t\"maxRetryTimes\": 0,\n" +
                "\t\t\"retryInterval\": 1,\n" +
                "\t\t\"params\": {\n" +
                "\t\t\t\"stopOnError\": false,\n" +
                "\t\t\t\"sqlType\": 1,\n" +
                "\t\t\t\"datasource\": 2172,\n" +
                "\t\t\t\"retryDispatchTimeout\": 1,\n" +
                "\t\t\t\"retryDispatchTimes\": 3,\n" +
                "\t\t\t\"type\": \"MYSQL\",\n" +
                "\t\t\t\"autoCommit\": true,\n" +
                "\t\t\t\"resourceFilesList\": [],\n" +
                "\t\t\t\"sql\": \" DROP TABLE tss.ST_01244096 ;\\n Create TABLE tss.ST_01244096 ( T_INT_1  INT NOT NULL DEFAULT 0, T_INT_2  INT NOT NULL DEFAULT 0) ;\\n ;\\n \"\n" +
                "\t\t},\n" +
                "\t\t\"preTasks\": [],\n" +
                "\t\t\"extras\": null,\n" +
                "\t\t\"depList\": [],\n" +
                "\t\t\"dependence\": null,\n" +
                "\t\t\"conditionResult\": null,\n" +
                "\t\t\"taskInstancePriority\": \"MEDIUM\",\n" +
                "\t\t\"workerGroup\": null,\n" +
                "\t\t\"workerGroupId\": null,\n" +
                "\t\t\"timeout\": {\n" +
                "\t\t\t\"enable\": true,\n" +
                "\t\t\t\"interval\": 1440,\n" +
                "\t\t\t\"strategy\": \"WARN\"\n" +
                "\t\t},\n" +
                "\t\t\"forbidden\": false,\n" +
                "\t\t\"subProcessTask\": false,\n" +
                "\t\t\"taskTimeoutParameter\": {\n" +
                "\t\t\t\"enable\": true,\n" +
                "\t\t\t\"strategy\": \"WARN\",\n" +
                "\t\t\t\"interval\": 1440\n" +
                "\t\t},\n" +
                "\t\t\"conditionsTask\": false,\n" +
                "\t\t\"tssConditionTask\": false\n" +
                "\t}, {\n" +
                "\t\t\"id\": \"tss-target-db-ddl-create\",\n" +
                "\t\t\"name\": \"tss-target-db-ddl-create_DAG\",\n" +
                "\t\t\"desc\": null,\n" +
                "\t\t\"type\": \"TSS_SQL\",\n" +
                "\t\t\"runFlag\": \"NORMAL\",\n" +
                "\t\t\"loc\": null,\n" +
                "\t\t\"maxRetryTimes\": 0,\n" +
                "\t\t\"retryInterval\": 1,\n" +
                "\t\t\"params\": {\n" +
                "\t\t\t\"stopOnError\": false,\n" +
                "\t\t\t\"sqlType\": 1,\n" +
                "\t\t\t\"datasource\": 2175,\n" +
                "\t\t\t\"retryDispatchTimeout\": 1,\n" +
                "\t\t\t\"retryDispatchTimes\": 3,\n" +
                "\t\t\t\"type\": \"TSS_DB2\",\n" +
                "\t\t\t\"autoCommit\": true,\n" +
                "\t\t\t\"resourceFilesList\": [],\n" +
                "\t\t\t\"sql\": \" DROP TABLE DB2INST1.TT_01244096 ;\\n Create TABLE DB2INST1.TT_01244096 ( T_INT_1  INTEGER  NOT NULL DEFAULT 0, T_INT_2  INTEGER  NOT NULL DEFAULT 0, T_VARCHAR_1 VARCHAR(50)  NOT NULL DEFAULT '0') ;\\n ;\\n \"\n" +
                "\t\t},\n" +
                "\t\t\"preTasks\": [\"tss-source-db-ddl-create_DAG\"],\n" +
                "\t\t\"extras\": null,\n" +
                "\t\t\"depList\": [\"tss-source-db-ddl-create_DAG\"],\n" +
                "\t\t\"dependence\": null,\n" +
                "\t\t\"conditionResult\": null,\n" +
                "\t\t\"taskInstancePriority\": \"MEDIUM\",\n" +
                "\t\t\"workerGroup\": null,\n" +
                "\t\t\"workerGroupId\": null,\n" +
                "\t\t\"timeout\": {\n" +
                "\t\t\t\"enable\": true,\n" +
                "\t\t\t\"interval\": 1440,\n" +
                "\t\t\t\"strategy\": \"WARN\"\n" +
                "\t\t},\n" +
                "\t\t\"forbidden\": false,\n" +
                "\t\t\"subProcessTask\": false,\n" +
                "\t\t\"taskTimeoutParameter\": {\n" +
                "\t\t\t\"enable\": true,\n" +
                "\t\t\t\"strategy\": \"WARN\",\n" +
                "\t\t\t\"interval\": 1440\n" +
                "\t\t},\n" +
                "\t\t\"conditionsTask\": false,\n" +
                "\t\t\"tssConditionTask\": false\n" +
                "\t}, {\n" +
                "\t\t\"id\": \"tss-source-db-data-insert\",\n" +
                "\t\t\"name\": \"tss-source-db-data-insert_DAG\",\n" +
                "\t\t\"desc\": null,\n" +
                "\t\t\"type\": \"TSS_SQL\",\n" +
                "\t\t\"runFlag\": \"NORMAL\",\n" +
                "\t\t\"loc\": null,\n" +
                "\t\t\"maxRetryTimes\": 0,\n" +
                "\t\t\"retryInterval\": 1,\n" +
                "\t\t\"params\": {\n" +
                "\t\t\t\"stopOnError\": false,\n" +
                "\t\t\t\"sqlType\": 1,\n" +
                "\t\t\t\"datasource\": 2172,\n" +
                "\t\t\t\"retryDispatchTimeout\": 1,\n" +
                "\t\t\t\"localParametersMap\": {\n" +
                "\t\t\t\t\"tss.ST_01244096_T0_T_INT_1\": {\n" +
                "\t\t\t\t\t\"prop\": \"tss.ST_01244096_T0_T_INT_1\",\n" +
                "\t\t\t\t\t\"direct\": \"IN\",\n" +
                "\t\t\t\t\t\"type\": \"INTEGER\",\n" +
                "\t\t\t\t\t\"value\": \"1\"\n" +
                "\t\t\t\t},\n" +
                "\t\t\t\t\"tss.ST_01244096_T0_T_INT_2\": {\n" +
                "\t\t\t\t\t\"prop\": \"tss.ST_01244096_T0_T_INT_2\",\n" +
                "\t\t\t\t\t\"direct\": \"IN\",\n" +
                "\t\t\t\t\t\"type\": \"INTEGER\",\n" +
                "\t\t\t\t\t\"value\": \"2\"\n" +
                "\t\t\t\t}\n" +
                "\t\t\t},\n" +
                "\t\t\t\"retryDispatchTimes\": 3,\n" +
                "\t\t\t\"type\": \"MYSQL\",\n" +
                "\t\t\t\"localParams\": [{\n" +
                "\t\t\t\t\"prop\": \"tss.ST_01244096_T0_T_INT_1\",\n" +
                "\t\t\t\t\"direct\": \"IN\",\n" +
                "\t\t\t\t\"type\": \"INTEGER\",\n" +
                "\t\t\t\t\"value\": \"1\"\n" +
                "\t\t\t}, {\n" +
                "\t\t\t\t\"prop\": \"tss.ST_01244096_T0_T_INT_2\",\n" +
                "\t\t\t\t\"direct\": \"IN\",\n" +
                "\t\t\t\t\"type\": \"INTEGER\",\n" +
                "\t\t\t\t\"value\": \"2\"\n" +
                "\t\t\t}],\n" +
                "\t\t\t\"autoCommit\": true,\n" +
                "\t\t\t\"resourceFilesList\": [],\n" +
                "\t\t\t\"sql\": \" INSERT INTO tss.ST_01244096 (T_INT_1,T_INT_2) values (${tss.ST_01244096_T0_T_INT_1},${tss.ST_01244096_T0_T_INT_2}) ;\\n\"\n" +
                "\t\t},\n" +
                "\t\t\"preTasks\": [\"tss-target-db-ddl-create_DAG\"],\n" +
                "\t\t\"extras\": null,\n" +
                "\t\t\"depList\": [\"tss-target-db-ddl-create_DAG\"],\n" +
                "\t\t\"dependence\": null,\n" +
                "\t\t\"conditionResult\": null,\n" +
                "\t\t\"taskInstancePriority\": \"MEDIUM\",\n" +
                "\t\t\"workerGroup\": null,\n" +
                "\t\t\"workerGroupId\": null,\n" +
                "\t\t\"timeout\": {\n" +
                "\t\t\t\"enable\": true,\n" +
                "\t\t\t\"interval\": 1440,\n" +
                "\t\t\t\"strategy\": \"WARN\"\n" +
                "\t\t},\n" +
                "\t\t\"forbidden\": false,\n" +
                "\t\t\"subProcessTask\": false,\n" +
                "\t\t\"taskTimeoutParameter\": {\n" +
                "\t\t\t\"enable\": true,\n" +
                "\t\t\t\"strategy\": \"WARN\",\n" +
                "\t\t\t\"interval\": 1440\n" +
                "\t\t},\n" +
                "\t\t\"conditionsTask\": false,\n" +
                "\t\t\"tssConditionTask\": false\n" +
                "\t}, {\n" +
                "\t\t\"id\": \"tss-target-db-data-insert\",\n" +
                "\t\t\"name\": \"tss-target-db-data-insert_DAG\",\n" +
                "\t\t\"desc\": null,\n" +
                "\t\t\"type\": \"TSS_SQL\",\n" +
                "\t\t\"runFlag\": \"NORMAL\",\n" +
                "\t\t\"loc\": null,\n" +
                "\t\t\"maxRetryTimes\": 0,\n" +
                "\t\t\"retryInterval\": 1,\n" +
                "\t\t\"params\": {\n" +
                "\t\t\t\"stopOnError\": false,\n" +
                "\t\t\t\"sqlType\": 1,\n" +
                "\t\t\t\"datasource\": 2175,\n" +
                "\t\t\t\"retryDispatchTimeout\": 1,\n" +
                "\t\t\t\"retryDispatchTimes\": 3,\n" +
                "\t\t\t\"type\": \"TSS_DB2\",\n" +
                "\t\t\t\"autoCommit\": true,\n" +
                "\t\t\t\"resourceFilesList\": [],\n" +
                "\t\t\t\"sql\": \"  ;\\n\"\n" +
                "\t\t},\n" +
                "\t\t\"preTasks\": [\"tss-source-db-data-insert_DAG\"],\n" +
                "\t\t\"extras\": null,\n" +
                "\t\t\"depList\": [\"tss-source-db-data-insert_DAG\"],\n" +
                "\t\t\"dependence\": null,\n" +
                "\t\t\"conditionResult\": null,\n" +
                "\t\t\"taskInstancePriority\": \"MEDIUM\",\n" +
                "\t\t\"workerGroup\": null,\n" +
                "\t\t\"workerGroupId\": null,\n" +
                "\t\t\"timeout\": {\n" +
                "\t\t\t\"enable\": true,\n" +
                "\t\t\t\"interval\": 1440,\n" +
                "\t\t\t\"strategy\": \"WARN\"\n" +
                "\t\t},\n" +
                "\t\t\"forbidden\": false,\n" +
                "\t\t\"subProcessTask\": false,\n" +
                "\t\t\"taskTimeoutParameter\": {\n" +
                "\t\t\t\"enable\": true,\n" +
                "\t\t\t\"strategy\": \"WARN\",\n" +
                "\t\t\t\"interval\": 1440\n" +
                "\t\t},\n" +
                "\t\t\"conditionsTask\": false,\n" +
                "\t\t\"tssConditionTask\": false\n" +
                "\t}, {\n" +
                "\t\t\"id\": \"ci_extractor_test0\",\n" +
                "\t\t\"name\": \"ci_extractor_test0_DAG\",\n" +
                "\t\t\"desc\": \"Mon Nov 28 15:12:54 CST 2022\",\n" +
                "\t\t\"type\": \"TSS_EXTRACT\",\n" +
                "\t\t\"runFlag\": \"NORMAL\",\n" +
                "\t\t\"loc\": null,\n" +
                "\t\t\"maxRetryTimes\": 0,\n" +
                "\t\t\"retryInterval\": 1,\n" +
                "\t\t\"params\": {\n" +
                "\t\t\t\"args\": \"{\\\"enable_batch_commit\\\":null,\\\"query_fetch_count\\\":null,\\\"insert_batch_count\\\":null,\\\"clear_cache\\\":null,\\\"error_limit_count\\\":null,\\\"error_limit_percentage\\\":null,\\\"decimal_rounding_mode\\\":null,\\\"source_data_source_name\\\":\\\"test_ci_mysql\\\",\\\"target_data_source_name\\\":\\\"test_ci_db2\\\",\\\"mappings\\\":[{\\\"sql_SPLITTER\\\":\\\";\\\\n\\\",\\\"source_db_schema\\\":\\\"tss\\\",\\\"source_table\\\":\\\"ST_01244096\\\",\\\"source_columns\\\":\\\"T_INT_1,T_INT_2,\\\\\\\"\\\\\\\"\\\\\\\"substr('cdnjfdfjd',5,2)\\\\\\\"\\\\\\\"\\\\\\\"\\\",\\\"target_db_schema\\\":\\\"DB2INST1\\\",\\\"target_table\\\":\\\"TT_01244096\\\",\\\"target_columns\\\":\\\"T_INT_1,T_INT_2,T_VARCHAR_1\\\",\\\"enable_target_table_pre_delete\\\":\\\"\\\",\\\"target_table_pre_delete_condition\\\":\\\"\\\",\\\"is_incremental_extraction\\\":\\\"\\\",\\\"version_column\\\":\\\"\\\",\\\"version_value_type\\\":\\\"\\\",\\\"time_format\\\":\\\"\\\",\\\"version_value_offset\\\":\\\"\\\",\\\"target_unique_columns\\\":\\\"\\\",\\\"source_table_extraction_condition\\\":\\\"\\\",\\\"asymmetric_field_test\\\":\\\" this is an asymmetric field test\\\"}]}\",\n" +
                "\t\t\t\"retryDispatchTimeout\": 1,\n" +
                "\t\t\t\"retryDispatchTimes\": 3,\n" +
                "\t\t\t\"dataSourceList\": [{\n" +
                "\t\t\t\t\"datasource\": 2172,\n" +
                "\t\t\t\t\"type\": \"MYSQL\"\n" +
                "\t\t\t}, {\n" +
                "\t\t\t\t\"datasource\": 2175,\n" +
                "\t\t\t\t\"type\": \"TSS_DB2\"\n" +
                "\t\t\t}],\n" +
                "\t\t\t\"resourceFilesList\": []\n" +
                "\t\t},\n" +
                "\t\t\"preTasks\": [\"tss-target-db-data-insert_DAG\"],\n" +
                "\t\t\"extras\": null,\n" +
                "\t\t\"depList\": [\"tss-target-db-data-insert_DAG\"],\n" +
                "\t\t\"dependence\": null,\n" +
                "\t\t\"conditionResult\": null,\n" +
                "\t\t\"taskInstancePriority\": \"MEDIUM\",\n" +
                "\t\t\"workerGroup\": null,\n" +
                "\t\t\"workerGroupId\": null,\n" +
                "\t\t\"timeout\": {\n" +
                "\t\t\t\"enable\": true,\n" +
                "\t\t\t\"interval\": 1440,\n" +
                "\t\t\t\"strategy\": \"WARN\"\n" +
                "\t\t},\n" +
                "\t\t\"forbidden\": false,\n" +
                "\t\t\"subProcessTask\": false,\n" +
                "\t\t\"taskTimeoutParameter\": {\n" +
                "\t\t\t\"enable\": true,\n" +
                "\t\t\t\"strategy\": \"WARN\",\n" +
                "\t\t\t\"interval\": 1440\n" +
                "\t\t},\n" +
                "\t\t\"conditionsTask\": false,\n" +
                "\t\t\"tssConditionTask\": false\n" +
                "\t}],\n" +
                "\t\"globalParams\": [],\n" +
                "\t\"timeout\": 1440,\n" +
                "\t\"timeoutStrategy\": \"WARN\",\n" +
                "\t\"tenantId\": 0,\n" +
                "\t\"alias\": null,\n" +
                "\t\"resourceConsumption\": {\n" +
                "\t\t\"thread\": 1,\n" +
                "\t\t\"memory\": 256\n" +
                "\t},\n" +
                "\t\"consumptionWaitingTimeout\": 7200\n" +
                "}";
        for (;;){
            java.util.Random random=new java.util.Random();
            int id = random.nextInt(9);
            TssEventMessageRecord tssEventMessageRecord = new TssEventMessageRecord();
            tssEventMessageRecord.setTopologyInstanceId(id);
            tssEventMessageRecord.setProcessDefinitionId(id);
            tssEventMessageRecord.setProcessInstanceId(id);
            tssEventMessageRecord.setPartition(id);
            tssEventMessageRecord.setState(id);
            //Map<String, String> body =  Collections.singletonMap("message" + id, "value" + value);

            try{
                if(id%2==1){
                    key = LettuceUtil.xadd("tss-stream:1", tssEventMessageRecord);
                }/*else if(id%2==2){
                    key = LettuceUtil.xadd("tss-stream:2", tssEventMessageRecord.convert2Map());
                }*/else {
                    key = LettuceUtil.xadd("tss-stream:2", tssEventMessageRecord);
                }
            }catch (Exception e){
                System.out.println("======"+e);
            }


            System.out.println(key);

           /* try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/
        }
    }

    @Override
    public void run() {
        try {
            sync();
        }catch (Exception e){
            //e.printStackTrace();
            System.out.println("----"+e);
            //System.exit(-1);
        }
    }

}
